跳到主要内容

MongoDB 的索引

索引基础概念题

1. MongoDB 索引的本质和工作原理

面试官: 请解释 MongoDB 索引的底层实现原理,以及它是如何提高查询性能的?

工作原理时序图:

Go 代码示例:

// 创建单字段索引
indexModel := mongo.IndexModel{
Keys: bson.D{{"age", 1}}, // 1表示升序,-1表示降序
}

// 创建复合索引
compoundIndex := mongo.IndexModel{
Keys: bson.D{{"user_id", 1}, {"created_at", -1}},
Options: options.Index().SetBackground(true),
}

// 创建哈希索引
hashIndex := mongo.IndexModel{
Keys: bson.D{{"email", "hashed"}},
}

2. 索引类型详解

面试官: MongoDB 支持哪些类型的索引?每种索引适用于什么场景?

各类型详细说明和 Go 示例:

// 1. 文本索引 - 全文搜索
textIndex := mongo.IndexModel{
Keys: bson.D{{"title", "text"}, {"content", "text"}},
Options: options.Index().SetDefaultLanguage("english"),
}

// 2. 地理空间索引 - 位置查询
geoIndex := mongo.IndexModel{
Keys: bson.D{{"location", "2dsphere"}},
}

// 3. 部分索引 - 只对满足条件的文档建索引
partialIndex := mongo.IndexModel{
Keys: bson.D{{"email", 1}},
Options: options.Index().SetPartialFilterExpression(
bson.D{{"email", bson.D{{"$exists", true}}}},
),
}

// 4. TTL索引 - 自动过期
ttlIndex := mongo.IndexModel{
Keys: bson.D{{"createdAt", 1}},
Options: options.Index().SetExpireAfterSeconds(3600), // 1小时后过期
}

// 5. 通配符索引 - 动态字段
wildcardIndex := mongo.IndexModel{
Keys: bson.D{{"metadata.$**", 1}},
}

复合索引与查询优化题

3. 复合索引的顺序原则

面试官: 复合索引中字段的顺序为什么很重要?请解释 ESR 原则,并给出实际的 Go 代码示例。

索引使用效果对比:

Go 实践示例:

type User struct {
ID primitive.ObjectID `bson:"_id,omitempty"`
Status string `bson:"status"`
Name string `bson:"name"`
Age int `bson:"age"`
City string `bson:"city"`
CreateAt time.Time `bson:"created_at"`
}

// 根据 ESR 原则创建索引
// 查询: db.users.find({status: "active", name: "John"}).sort({created_at: -1})
esr_index := mongo.IndexModel{
Keys: bson.D{
{"status", 1}, // E: 等值查询
{"created_at", -1}, // S: 排序字段
{"name", 1}, // 可能的等值查询
},
Options: options.Index().SetName("esr_status_created_name"),
}

// 错误示例 - 范围查询放在前面
bad_index := mongo.IndexModel{
Keys: bson.D{
{"age", 1}, // R: 范围查询不应该放在前面
{"status", 1}, // E: 等值查询
{"created_at", -1}, // S: 排序
},
}

4. 索引的前缀匹配规则

面试官: 请解释复合索引的前缀匹配规则,以及如何利用这个特性优化查询?

Go 代码示例:

// 创建复合索引
compoundIndex := mongo.IndexModel{
Keys: bson.D{{"user_id", 1}, {"status", 1}, {"created_at", -1}},
}

// 可以有效使用索引的查询
func efficientQueries(collection *mongo.Collection) {
// 查询1: 使用 user_id (索引前缀)
filter1 := bson.D{{"user_id", "123"}}

// 查询2: 使用 user_id + status (索引前缀)
filter2 := bson.D{{"user_id", "123"}, {"status", "active"}}

// 查询3: 使用完整索引
filter3 := bson.D{
{"user_id", "123"},
{"status", "active"},
{"created_at", bson.D{{"$gte", time.Now().AddDate(0, -1, 0)}}},
}
}

// 无法有效使用索引的查询
func inefficientQueries(collection *mongo.Collection) {
// 查询1: 跳过了 user_id,从 status 开始
filter1 := bson.D{{"status", "active"}}

// 查询2: 跳过了前面的字段
filter2 := bson.D{{"created_at", bson.D{{"$gte", time.Now()}}}}
}

高级索引优化题

5. 索引性能分析和监控

面试官: 如何在 Go 应用中监控和分析 MongoDB 索引的性能?请说明 explain() 的使用方法。

Go 代码实现:

type QueryAnalyzer struct {
collection *mongo.Collection
}

// 执行查询分析
func (qa *QueryAnalyzer) AnalyzeQuery(filter bson.D) (*QueryStats, error) {
// 获取执行计划
cursor, err := qa.collection.Find(
context.TODO(),
filter,
options.Find().SetExplain(true),
)
if err != nil {
return nil, err
}

var explainResult bson.M
if cursor.Next(context.TODO()) {
err = cursor.Decode(&explainResult)
if err != nil {
return nil, err
}
}

return qa.parseExplainResult(explainResult), nil
}

type QueryStats struct {
IndexUsed bool `json:"index_used"`
IndexName string `json:"index_name"`
DocsExamined int `json:"docs_examined"`
DocsReturned int `json:"docs_returned"`
ExecutionTimeMS int `json:"execution_time_ms"`
QueryPlan string `json:"query_plan"`
}

// 性能监控中间件
func (qa *QueryAnalyzer) MonitorSlowQueries(threshold time.Duration) {
// 设置数据库性能分析器
result := qa.collection.Database().RunCommand(
context.TODO(),
bson.D{
{"profile", 2}, // 记录所有操作
{"slowms", int(threshold.Milliseconds())},
},
)

if result.Err() != nil {
log.Printf("Failed to set profiler: %v", result.Err())
}
}

6. 索引策略设计模式

面试官: 在设计微服务架构时,如何制定 MongoDB 的索引策略?请结合具体业务场景说明。

电商系统索引设计示例:

// 用户服务 - 用户集合索引设计
type UserService struct {
collection *mongo.Collection
}

func (us *UserService) CreateIndexes() error {
indexes := []mongo.IndexModel{
// 1. 用户登录查询 - 高频查询
{
Keys: bson.D{{"email", 1}},
Options: options.Index().
SetUnique(true).
SetName("email_unique"),
},

// 2. 用户状态查询 - 管理后台
{
Keys: bson.D{{"status", 1}, {"created_at", -1}},
Options: options.Index().SetName("status_created"),
},

// 3. 地理位置查询 - 附近用户
{
Keys: bson.D{{"location", "2dsphere"}},
Options: options.Index().SetName("location_geo"),
},

// 4. 文本搜索 - 用户搜索
{
Keys: bson.D{{"username", "text"}, {"full_name", "text"}},
Options: options.Index().
SetDefaultLanguage("english").
SetName("user_text_search"),
},
}

_, err := us.collection.Indexes().CreateMany(context.TODO(), indexes)
return err
}

// 订单服务 - 订单集合索引设计
type OrderService struct {
collection *mongo.Collection
}

func (os *OrderService) CreateIndexes() error {
indexes := []mongo.IndexModel{
// 1. 用户订单查询 - 最高频
{
Keys: bson.D{{"user_id", 1}, {"created_at", -1}},
Options: options.Index().SetName("user_orders"),
},

// 2. 订单状态处理 - 业务流程
{
Keys: bson.D{{"status", 1}, {"updated_at", -1}},
Options: options.Index().SetName("status_processing"),
},

// 3. 支付状态查询 - 支付回调
{
Keys: bson.D{{"payment_id", 1}},
Options: options.Index().
SetSparse(true).
SetName("payment_callback"),
},

// 4. TTL索引 - 自动清理过期订单
{
Keys: bson.D{{"expires_at", 1}},
Options: options.Index().
SetExpireAfterSeconds(0).
SetName("order_ttl"),
},
}

_, err := os.collection.Indexes().CreateMany(context.TODO(), indexes)
return err
}

实际问题解决题

7. 索引对写入性能的影响

面试官: 索引会如何影响写入性能?在高并发写入场景下如何平衡查询和写入性能?

Go 实现示例:

type BatchWriter struct {
collection *mongo.Collection
batchSize int
buffer []interface{}
mu sync.Mutex
}

func NewBatchWriter(collection *mongo.Collection, batchSize int) *BatchWriter {
bw := &BatchWriter{
collection: collection,
batchSize: batchSize,
buffer: make([]interface{}, 0, batchSize),
}

// 启动后台批量写入
go bw.backgroundFlush()
return bw
}

func (bw *BatchWriter) Add(doc interface{}) error {
bw.mu.Lock()
defer bw.mu.Unlock()

bw.buffer = append(bw.buffer, doc)

if len(bw.buffer) >= bw.batchSize {
return bw.flush()
}

return nil
}

func (bw *BatchWriter) flush() error {
if len(bw.buffer) == 0 {
return nil
}

// 使用有序写入,提高性能
opts := options.InsertMany().SetOrdered(false)

_, err := bw.collection.InsertMany(context.TODO(), bw.buffer, opts)
if err != nil {
return err
}

// 清空缓冲区
bw.buffer = bw.buffer[:0]
return nil
}

// 后台定期刷新
func (bw *BatchWriter) backgroundFlush() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

for range ticker.C {
bw.mu.Lock()
bw.flush()
bw.mu.Unlock()
}
}

// 索引构建策略
func (bw *BatchWriter) CreateIndexesWithStrategy() error {
indexes := []mongo.IndexModel{
// 在线构建,不阻塞写入
{
Keys: bson.D{{"user_id", 1}},
Options: options.Index().SetBackground(true),
},

// 部分索引,减少索引大小
{
Keys: bson.D{{"status", 1}},
Options: options.Index().
SetPartialFilterExpression(
bson.D{{"status", bson.D{{"$in", []string{"active", "pending"}}}}},
).
SetBackground(true),
},
}

_, err := bw.collection.Indexes().CreateMany(context.TODO(), indexes)
return err
}

8. 索引维护和监控

面试官: 在生产环境中,如何进行索引的维护和监控?请设计一个索引健康检查系统。

下面是相关的指标说明:

指标说明监控意义
Index Name (索引名)唯一标识索引方便定位问题索引
Collection (所属集合)索引所在集合索引与表/集合的对应关系
Size (索引大小)索引占用的存储空间索引过大可能影响内存占用和查询性能
Usage Count (使用次数)索引被查询优化器使用的次数判断索引是否真正被利用
Hit Ratio (命中率)命中次数 / 总查询次数判断索引效率,避免“假用”情况
Last Used (最后使用时间)最后一次查询使用索引的时间辨别长期未使用的索引
Build Time / Rebuild Time建立索引所耗费的时间大索引的重建可能拖慢维护窗口
Access Patterns (访问模式)查询语句中如何利用索引判断索引是否符合业务需求
Read / Write Overhead索引对写入的影响(写入时需要维护额外索引)避免冗余索引增加写入延迟
Fragmentation (碎片率)索引内部碎片程度高碎片率需要考虑 reIndex
Cache Residency (缓存命中率)索引是否大部分在内存中索引过大、无法驻留内存可能导致查询变慢
Compound Index Coverage (复合索引覆盖率)是否用于覆盖查询 (Covering Query)合理设计索引字段顺序提升性能
Index Type (索引类型)普通索引、唯一索引、TTL 索引、地理位置索引、全文索引等不同索引类型有不同的维护方式和监控点
Duplicate Index Check (冗余检测)是否存在重复、子集、冗余索引及时优化,减少存储和写入开销
Selectivity (选择性)索引区分度(索引字段唯一值/总行数)区分度低的索引往往作用有限

Go 实现的索引监控系统:

type IndexMonitor struct {
client *mongo.Client
database string
}

type IndexHealth struct {
IndexName string `json:"index_name"`
Collection string `json:"collection"`
Size int64 `json:"size_bytes"`
UsageCount int64 `json:"usage_count"`
HitRatio float64 `json:"hit_ratio"`
LastUsed time.Time `json:"last_used"`
Efficiency string `json:"efficiency"` // "good", "poor", "unused"
}

func (im *IndexMonitor) CheckIndexHealth() ([]IndexHealth, error) {
db := im.client.Database(im.database)
collections, err := db.ListCollectionNames(context.TODO(), bson.D{})
if err != nil {
return nil, err
}

var healthReports []IndexHealth

for _, collName := range collections {
coll := db.Collection(collName)

// 获取索引信息
indexes, err := coll.Indexes().List(context.TODO())
if err != nil {
continue
}

for indexes.Next(context.TODO()) {
var indexInfo bson.M
if err := indexes.Decode(&indexInfo); err != nil {
continue
}

// 获取索引统计信息
stats, err := im.getIndexStats(coll, indexInfo["name"].(string))
if err != nil {
continue
}

health := IndexHealth{
IndexName: indexInfo["name"].(string),
Collection: collName,
Size: stats.Size,
UsageCount: stats.UsageCount,
HitRatio: stats.HitRatio,
LastUsed: stats.LastUsed,
Efficiency: im.calculateEfficiency(stats),
}

healthReports = append(healthReports, health)
}
}

return healthReports, nil
}

type IndexStats struct {
Size int64
UsageCount int64
HitRatio float64
LastUsed time.Time
}

func (im *IndexMonitor) getIndexStats(coll *mongo.Collection, indexName string) (*IndexStats, error) {
// 获取索引统计信息
result := coll.Database().RunCommand(context.TODO(), bson.D{
{"collStats", coll.Name()},
{"indexDetails", true},
})

var stats bson.M
if err := result.Decode(&stats); err != nil {
return nil, err
}

// 解析统计信息
indexStats := &IndexStats{
Size: 0, // 从 stats 中解析
UsageCount: 0, // 从 stats 中解析
HitRatio: 0, // 计算命中率
LastUsed: time.Now(), // 从 stats 中解析
}

return indexStats, nil
}

func (im *IndexMonitor) calculateEfficiency(stats *IndexStats) string {
if stats.UsageCount == 0 {
return "unused"
}

if stats.HitRatio < 0.1 {
return "poor"
}

return "good"
}

// 自动优化建议
func (im *IndexMonitor) GenerateOptimizationSuggestions(healthReports []IndexHealth) []string {
var suggestions []string

for _, health := range healthReports {
switch health.Efficiency {
case "unused":
suggestions = append(suggestions,
fmt.Sprintf("Consider dropping unused index '%s' in collection '%s'",
health.IndexName, health.Collection))

case "poor":
suggestions = append(suggestions,
fmt.Sprintf("Index '%s' in collection '%s' has poor performance, consider rebuilding",
health.IndexName, health.Collection))
}

// 检查索引大小
if health.Size > 1024*1024*1024 { // 1GB
suggestions = append(suggestions,
fmt.Sprintf("Large index '%s' (%d bytes) in collection '%s' may need optimization",
health.IndexName, health.Size, health.Collection))
}
}

return suggestions
}

9. 分片环境下的索引策略

面试官: 在 MongoDB 分片集群中,索引策略有什么特殊考虑?如何选择合适的分片键?

分片键选择决策流程:

Go 代码示例:

// 分片环境下的索引设计
type ShardedCollectionManager struct {
client *mongo.Client
dbName string
}

func (scm *ShardedCollectionManager) SetupShardedCollection(collName string) error {
adminDB := scm.client.Database("admin")

// 1. 启用数据库分片
result := adminDB.RunCommand(context.TODO(), bson.D{
{"enableSharding", scm.dbName},
})
if result.Err() != nil {
return result.Err()
}

// 2. 创建分片键索引(必须先创建索引)
coll := scm.client.Database(scm.dbName).Collection(collName)
shardKeyIndex := mongo.IndexModel{
Keys: bson.D{{"user_id", "hashed"}}, // 哈希分片
Options: options.Index().SetName("shard_key_index"),
}

_, err := coll.Indexes().CreateOne(context.TODO(), shardKeyIndex)
if err != nil {
return err
}

// 3. 设置分片键
shardCmd := bson.D{
{"shardCollection", scm.dbName + "." + collName},
{"key", bson.D{{"user_id", "hashed"}}},
}

result = adminDB.RunCommand(context.TODO(), shardCmd)
return result.Err()
}

// 为分片集合创建优化索引
func (scm *ShardedCollectionManager) CreateShardOptimizedIndexes(collName string) error {
coll := scm.client.Database(scm.dbName).Collection(collName)

indexes := []mongo.IndexModel{
// 1. 包含分片键的复合索引 - 支持单分片查询
{
Keys: bson.D{{"user_id", 1}, {"created_at", -1}},
Options: options.Index().SetName("user_created_compound"),
},

// 2. 包含分片键的查询索引
{
Keys: bson.D{{"user_id", 1}, {"status", 1}},
Options: options.Index().SetName("user_status_compound"),
},

// 3. 非分片键索引 - 会导致广播查询,谨慎使用
{
Keys: bson.D{{"email", 1}},
Options: options.Index().
SetSparse(true).
SetName("email_sparse"), // 使用稀疏索引减少开销
},
}

_, err := coll.Indexes().CreateMany(context.TODO(), indexes)
return err
}

// 分片查询优化
type ShardedQueryOptimizer struct {
collection *mongo.Collection
}

func (sqo *ShardedQueryOptimizer) OptimizedUserQuery(userID string, filters bson.D) (*mongo.Cursor, error) {
// 确保查询包含分片键,实现单分片路由
shardKeyFilter := bson.D{{"user_id", userID}}

// 合并其他过滤条件
combinedFilter := append(shardKeyFilter, filters...)

// 添加hint以确保使用正确的索引
opts := options.Find().SetHint(bson.D{{"user_id", 1}, {"created_at", -1}})

return sqo.collection.Find(context.TODO(), combinedFilter, opts)
}

// 监控分片查询性能
func (sqo *ShardedQueryOptimizer) AnalyzeShardDistribution() error {
// 获取分片统计信息
result := sqo.collection.Database().RunCommand(context.TODO(), bson.D{
{"shardDistribution", sqo.collection.Name()},
})

var shardStats bson.M
if err := result.Decode(&shardStats); err != nil {
return err
}

// 分析数据分布是否均匀
// 实现具体的分析逻辑...

return nil
}

10. 索引故障排查

面试官: 在生产环境中遇到查询性能问题时,你会如何系统性地排查索引相关问题?

Go 实现的故障排查工具:

type IndexTroubleshooter struct {
client *mongo.Client
db *mongo.Database
}

type PerformanceIssue struct {
Collection string `json:"collection"`
Query bson.M `json:"query"`
ExecutionTime time.Duration `json:"execution_time"`
DocsExamined int64 `json:"docs_examined"`
DocsReturned int64 `json:"docs_returned"`
IndexUsed string `json:"index_used"`
Issues []string `json:"issues"`
Suggestions []string `json:"suggestions"`
}

func (it *IndexTroubleshooter) DiagnoseSlowQuery(
collectionName string,
query bson.M,
) (*PerformanceIssue, error) {

coll := it.db.Collection(collectionName)

// 1. 执行 explain 分析
explainResult, err := it.explainQuery(coll, query)
if err != nil {
return nil, err
}

// 2. 分析执行统计
issue := &PerformanceIssue{
Collection: collectionName,
Query: query,
}

it.analyzeExplainResult(explainResult, issue)

// 3. 检查索引状态
it.checkIndexHealth(coll, issue)

// 4. 生成优化建议
it.generateSuggestions(issue)

return issue, nil
}

func (it *IndexTroubleshooter) explainQuery(
coll *mongo.Collection,
query bson.M,
) (bson.M, error) {

pipeline := mongo.Pipeline{
bson.D{{"$match", query}},
bson.D{{"$limit", 1}},
}

cursor, err := coll.Aggregate(context.TODO(), pipeline,
options.Aggregate().SetExplain(true))
if err != nil {
return nil, err
}
defer cursor.Close(context.TODO())

var result bson.M
if cursor.Next(context.TODO()) {
err = cursor.Decode(&result)
}

return result, err
}

func (it *IndexTroubleshooter) analyzeExplainResult(
explainResult bson.M,
issue *PerformanceIssue,
) {

// 解析执行统计信息
if stages, ok := explainResult["stages"].(bson.A); ok {
for _, stage := range stages {
if stageDoc, ok := stage.(bson.M); ok {
if executionStats, ok := stageDoc["executionStats"].(bson.M); ok {
// 提取关键指标
if examined, ok := executionStats["totalDocsExamined"].(int64); ok {
issue.DocsExamined = examined
}
if returned, ok := executionStats["totalDocsReturned"].(int64); ok {
issue.DocsReturned = returned
}
if execTime, ok := executionStats["executionTimeMillis"].(int64); ok {
issue.ExecutionTime = time.Duration(execTime) * time.Millisecond
}
}

// 检查是否使用了索引
if winningPlan, ok := stageDoc["winningPlan"].(bson.M); ok {
it.analyzeQueryPlan(winningPlan, issue)
}
}
}
}

// 分析性能问题
if issue.DocsExamined > 0 && issue.DocsReturned > 0 {
ratio := float64(issue.DocsExamined) / float64(issue.DocsReturned)
if ratio > 100 {
issue.Issues = append(issue.Issues,
fmt.Sprintf("High document scan ratio: %.2f", ratio))
}
}

if issue.ExecutionTime > 100*time.Millisecond {
issue.Issues = append(issue.Issues,
fmt.Sprintf("Slow execution time: %v", issue.ExecutionTime))
}
}

func (it *IndexTroubleshooter) analyzeQueryPlan(
plan bson.M,
issue *PerformanceIssue,
) {

if stage, ok := plan["stage"].(string); ok {
switch stage {
case "COLLSCAN":
issue.Issues = append(issue.Issues, "Using collection scan (no index)")
issue.IndexUsed = "NONE"

case "IXSCAN":
if indexName, ok := plan["indexName"].(string); ok {
issue.IndexUsed = indexName
}

case "FETCH":
// 检查是否有嵌套的索引扫描
if inputStage, ok := plan["inputStage"].(bson.M); ok {
it.analyzeQueryPlan(inputStage, issue)
}
}
}
}

func (it *IndexTroubleshooter) checkIndexHealth(
coll *mongo.Collection,
issue *PerformanceIssue,
) {

// 获取集合的所有索引
cursor, err := coll.Indexes().List(context.TODO())
if err != nil {
return
}
defer cursor.Close(context.TODO())

var indexes []bson.M
if err = cursor.All(context.TODO(), &indexes); err != nil {
return
}

// 检查是否有合适的索引覆盖查询字段
queryFields := it.extractQueryFields(issue.Query)
hasMatchingIndex := false

for _, index := range indexes {
if keys, ok := index["key"].(bson.M); ok {
if it.indexCoversQuery(keys, queryFields) {
hasMatchingIndex = true
break
}
}
}

if !hasMatchingIndex {
issue.Issues = append(issue.Issues, "No suitable index found for query fields")
}
}

func (it *IndexTroubleshooter) extractQueryFields(query bson.M) []string {
var fields []string
for key := range query {
if key != "$and" && key != "$or" {
fields = append(fields, key)
}
}
return fields
}

func (it *IndexTroubleshooter) indexCoversQuery(
indexKeys bson.M,
queryFields []string,
) bool {

indexFieldCount := 0
for field := range indexKeys {
for _, queryField := range queryFields {
if field == queryField {
indexFieldCount++
break
}
}
}

// 简单检查:索引是否覆盖了至少一个查询字段
return indexFieldCount > 0
}

func (it *IndexTroubleshooter) generateSuggestions(issue *PerformanceIssue) {
if issue.IndexUsed == "NONE" {
queryFields := it.extractQueryFields(issue.Query)
if len(queryFields) > 0 {
suggestion := fmt.Sprintf("Create index on: %v", queryFields)
issue.Suggestions = append(issue.Suggestions, suggestion)
}
}

if issue.DocsExamined > issue.DocsReturned*10 {
issue.Suggestions = append(issue.Suggestions,
"Consider using more selective query conditions")
}

if issue.ExecutionTime > 1*time.Second {
issue.Suggestions = append(issue.Suggestions,
"Consider adding compound index or optimizing query structure")
}
}

// 批量诊断工具
func (it *IndexTroubleshooter) BatchDiagnoseCollections() ([]*PerformanceIssue, error) {
// 从慢查询日志中获取问题查询
slowQueries, err := it.getSlowQueriesFromProfiler()
if err != nil {
return nil, err
}

var issues []*PerformanceIssue
for _, slowQuery := range slowQueries {
issue, err := it.DiagnoseSlowQuery(slowQuery.Collection, slowQuery.Query)
if err != nil {
continue
}
issues = append(issues, issue)
}

return issues, nil
}

type SlowQuery struct {
Collection string
Query bson.M
Duration time.Duration
}

func (it *IndexTroubleshooter) getSlowQueriesFromProfiler() ([]SlowQuery, error) {
// 从 system.profile 集合获取慢查询
profileColl := it.db.Collection("system.profile")

filter := bson.M{
"op": "query",
"millis": bson.M{"$gte": 100}, // 100ms以上的查询
}

cursor, err := profileColl.Find(context.TODO(), filter,
options.Find().SetSort(bson.D{{"millis", -1}}).SetLimit(50))
if err != nil {
return nil, err
}
defer cursor.Close(context.TODO())

var slowQueries []SlowQuery
for cursor.Next(context.TODO()) {
var profile bson.M
if err := cursor.Decode(&profile); err != nil {
continue
}

if ns, ok := profile["ns"].(string); ok {
parts := strings.Split(ns, ".")
if len(parts) >= 2 {
collection := parts[1]

if command, ok := profile["command"].(bson.M); ok {
if query, ok := command["filter"].(bson.M); ok {
duration := time.Duration(profile["millis"].(int64)) * time.Millisecond

slowQueries = append(slowQueries, SlowQuery{
Collection: collection,
Query: query,
Duration: duration,
})
}
}
}
}
}

return slowQueries, nil
}

MongoDB vs MySQL 索引异同点

特性MySQL (InnoDB 引擎为例)MongoDB
底层数据结构B+Tree(大多数索引),InnoDB 默认聚簇索引B-Tree 为主(也支持哈希、地理位置索引、全文索引)
主键索引聚簇索引(表数据按主键顺序存储在 B+Tree 叶子节点上)_id 默认索引,但不聚簇,索引与数据物理上分离
辅助索引 (Secondary Index)非聚簇索引(叶子节点存储行主键,再回表查询数据)二级索引指向文档 _id,需要回表到 collection 取数据
存储方式索引和表数据在物理上耦合(聚簇)索引与文档分离存储,独立的 B-Tree 结构
覆盖查询支持(索引包含所需字段时可避免回表)也支持(若 projection 请求的字段完全在索引里)
自增主键常见(AUTO_INCREMENT默认 _idObjectId (时间 + 机器 + 随机),避免热点
特殊索引全文(FULLTEXT)、空间索引哈希索引、TTL 索引、地理空间索引、全文索引
索引优化重点主键设计(避免自增导致热点写入)、联合索引顺序、覆盖查询复合索引匹配规则(前缀匹配)、避免过大索引、TTL 场景化索引
碎片问题聚簇索引会随数据更新产生碎片索引文件独立,也可能产生碎片,需要 compactreIndex

1. MySQL (InnoDB) 聚簇索引

✅ 特点:

  • 主索引(Primary Key)即数据存储(聚簇索引)
  • 非主键索引叶子节点存放的是 主键值,查询时必须“回表”才能获取数据

2. MongoDB _id 索引 (非聚簇)

✅ 特点:

  • _id 是默认唯一索引,但索引树和 Collection 数据文件是分离的
  • 索引叶子节点只存储 _id 和指向文档的指针,获取完整数据一定要查 collection

3. 对比图(聚簇 vs 非聚簇)